{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Visualizing Customer addresses on a flood plane\n",
    "\n",
    "King County (WA) publishes flood plain data as well as tax parcel data. We can use the addresses in the tax parcel data and use the geocoder to calculate coordinates. Using this coordinates and the flood plain data we can enrich out dataset with a flag indicating whether the house is in a flood zone or not.\n",
    "\n",
    "The following data has been sourced from King County's Open data portal. [_Link_](https://data.kingcounty.gov/)\n",
    "1. [Address Data](https://mmlspark.blob.core.windows.net/publicwasb/maps/KingCountyAddress.csv)\n",
    "1. [Flood plains](https://mmlspark.blob.core.windows.net/publicwasb/maps/KingCountyFloodPlains.geojson)\n",
    "\n",
    "For this demonstration, please follow the instructions on setting up your azure maps account from the overview notebook."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "1. Upload the flood plains data as map data to your creator resource"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "import time\n",
    "import requests\n",
    "from requests.adapters import HTTPAdapter\n",
    "from requests.packages.urllib3.util.retry import Retry\n",
    "\n",
    "# Configure more resiliant requests to stop flakiness\n",
    "retry_strategy = Retry(\n",
    "    total=3,\n",
    "    status_forcelist=[429, 500, 502, 503, 504],\n",
    "    allowed_methods=[\"HEAD\", \"GET\", \"PUT\", \"DELETE\", \"OPTIONS\", \"TRACE\"],\n",
    ")\n",
    "adapter = HTTPAdapter(max_retries=retry_strategy)\n",
    "http = requests.Session()\n",
    "http.mount(\"https://\", adapter)\n",
    "http.mount(\"http://\", adapter)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.core.platform import *\n",
    "\n",
    "# Azure Maps account key\n",
    "maps_key = find_secret(\n",
    "    secret_name=\"azuremaps-api-key\", keyvault=\"mmlspark-build-keys\"\n",
    ")  # Replace this with your azure maps key\n",
    "\n",
    "# Creator Geo prefix\n",
    "# for this example, assuming that the creator resource is created in `EAST US 2`.\n",
    "atlas_geo_prefix = \"us\"\n",
    "\n",
    "# Load flood plains data\n",
    "flood_plain_geojson = http.get(\n",
    "    \"https://mmlspark.blob.core.windows.net/publicwasb/maps/KingCountyFloodPlains.geojson\"\n",
    ").content\n",
    "\n",
    "# Upload this flood plains data to your maps/creator account. This is a Long-Running async operation and takes approximately 15~30 seconds to complete\n",
    "r = http.post(\n",
    "    f\"https://{atlas_geo_prefix}.atlas.microsoft.com/mapData/upload?api-version=1.0&dataFormat=geojson&subscription-key={maps_key}\",\n",
    "    json=json.loads(flood_plain_geojson),\n",
    ")\n",
    "\n",
    "# Poll for resource upload completion\n",
    "resource_location = r.headers.get(\"location\")\n",
    "for _ in range(20):\n",
    "    resource = json.loads(\n",
    "        http.get(f\"{resource_location}&subscription-key={maps_key}\").content\n",
    "    )\n",
    "    status = resource[\"status\"].lower()\n",
    "    if status == \"running\":\n",
    "        time.sleep(5)  # wait in a polling loop\n",
    "    elif status == \"succeeded\":\n",
    "        break\n",
    "    else:\n",
    "        raise ValueError(\"Unknown status {}\".format(status))\n",
    "\n",
    "# Once the above operation returns a HTTP 201, get the user_data_id of the flood plains data, you uploaded to your map account.\n",
    "user_data_id_resource_url = resource[\"resourceLocation\"]\n",
    "user_data_id = json.loads(\n",
    "    http.get(f\"{user_data_id_resource_url}&subscription-key={maps_key}\").content\n",
    ")[\"udid\"]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have the flood plains data setup in our maps account, we can use the `CheckPointInPolygon` function to check if a location `(lat,lon)` coordinate is in a flood zone.\n",
    "\n",
    "### Load address data:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = spark.read.option(\"header\", \"true\").csv(\n",
    "    \"wasbs://publicwasb@mmlspark.blob.core.windows.net/maps/KingCountyAddress.csv\"\n",
    ")\n",
    "\n",
    "# Visualize incoming schema\n",
    "print(\"Schema:\")\n",
    "data.printSchema()\n",
    "\n",
    "# Choose a subset of the data for this example\n",
    "subset_data = data.limit(50)\n",
    "display(subset_data)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Wire-up the Address Geocoder\n",
    "\n",
    "We will use the address geocoder to enrich the dataset with location coordinates of the addresses."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import col\n",
    "from synapse.ml.stages import FixedMiniBatchTransformer, FlattenBatch\n",
    "from synapse.ml.services.geospatial import *\n",
    "\n",
    "\n",
    "def extract_location_fields(df):\n",
    "    # Use this function to select only lat/lon columns into the dataframe\n",
    "    return df.select(\n",
    "        col(\"*\"),\n",
    "        col(\"output.response.results\")\n",
    "        .getItem(0)\n",
    "        .getField(\"position\")\n",
    "        .getField(\"lat\")\n",
    "        .alias(\"Latitude\"),\n",
    "        col(\"output.response.results\")\n",
    "        .getItem(0)\n",
    "        .getField(\"position\")\n",
    "        .getField(\"lon\")\n",
    "        .alias(\"Longitude\"),\n",
    "    ).drop(\"output\")\n",
    "\n",
    "\n",
    "# Azure Maps geocoder to enhance the dataframe with location data\n",
    "geocoder = (\n",
    "    AddressGeocoder()\n",
    "    .setSubscriptionKey(maps_key)\n",
    "    .setAddressCol(\"FullAddress\")\n",
    "    .setOutputCol(\"output\")\n",
    ")\n",
    "\n",
    "# Set up a fixed mini batch transformer to geocode addresses\n",
    "batched_dataframe = geocoder.transform(\n",
    "    FixedMiniBatchTransformer().setBatchSize(10).transform(subset_data.coalesce(1))\n",
    ")\n",
    "geocoded_addresses = extract_location_fields(\n",
    "    FlattenBatch().transform(batched_dataframe)\n",
    ")\n",
    "\n",
    "# Display the results\n",
    "display(geocoded_addresses)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have geocoded the addresses, we can now use the `CheckPointInPolygon` function to check if a property is in a flood zone or not.\n",
    "\n",
    "### Setup Check Point In Polygon "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def extract_point_in_polygon_result_fields(df):\n",
    "    # Use this function to select only lat/lon columns into the dataframe\n",
    "    return df.select(\n",
    "        col(\"*\"),\n",
    "        col(\"output.result.pointInPolygons\").alias(\"In Polygon\"),\n",
    "        col(\"output.result.intersectingGeometries\").alias(\"Intersecting Polygons\"),\n",
    "    ).drop(\"output\")\n",
    "\n",
    "\n",
    "check_point_in_polygon = (\n",
    "    CheckPointInPolygon()\n",
    "    .setSubscriptionKey(maps_key)\n",
    "    .setGeography(atlas_geo_prefix)\n",
    "    .setUserDataIdentifier(user_data_id)\n",
    "    .setLatitudeCol(\"Latitude\")\n",
    "    .setLongitudeCol(\"Longitude\")\n",
    "    .setOutputCol(\"output\")\n",
    ")\n",
    "\n",
    "\n",
    "flood_plain_addresses = extract_point_in_polygon_result_fields(\n",
    "    check_point_in_polygon.transform(geocoded_addresses)\n",
    ")\n",
    "\n",
    "# Display the results\n",
    "display(flood_plain_addresses)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cleanup Uploaded User Data (Optional)\n",
    "You can (optionally) delete the uploaded geojson polygon."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "res = http.delete(\n",
    "    f\"https://{atlas_geo_prefix}.atlas.microsoft.com/mapData/{user_data_id}?api-version=1.0&subscription-key={maps_key}\"\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
